//nolint
/*
Copyright (c) 2013-2016 Errplane Inc.
This code is originally from: https://github.com/influxdata/influxdb/blob/1.7/services/meta/client.go

2022.01.23 change http to rpc
Add TagKeys,FieldKeys etc.
Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.
*/

package metaclient

import (
	"bytes"
	crand "crypto/rand"
	"crypto/sha256"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net"
	"net/http"
	"path"
	"sort"
	"strconv"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/gogo/protobuf/proto"
	"github.com/influxdata/influxdb/models"
	originql "github.com/influxdata/influxql"
	"github.com/openGemini/openGemini/app/ts-meta/meta/message"
	"github.com/openGemini/openGemini/engine/executor/spdy"
	"github.com/openGemini/openGemini/engine/executor/spdy/transport"
	"github.com/openGemini/openGemini/engine/op"
	"github.com/openGemini/openGemini/lib/config"
	"github.com/openGemini/openGemini/lib/errno"
	"github.com/openGemini/openGemini/lib/logger"
	"github.com/openGemini/openGemini/lib/rand"
	"github.com/openGemini/openGemini/lib/statisticsPusher/statistics"
	util "github.com/openGemini/openGemini/lib/util"
	set "github.com/openGemini/openGemini/open_src/github.com/deckarep/golang-set"
	"github.com/openGemini/openGemini/open_src/golang.org/x/crypto/pbkdf2"
	"github.com/openGemini/openGemini/open_src/influx/influxql"
	meta2 "github.com/openGemini/openGemini/open_src/influx/meta"
	proto2 "github.com/openGemini/openGemini/open_src/influx/meta/proto"
	"github.com/openGemini/openGemini/open_src/influx/query"
	"github.com/openGemini/openGemini/open_src/vm/protoparser/influx"
	"go.uber.org/zap"
	"golang.org/x/text/encoding/unicode"
	"golang.org/x/text/transform"
)

const (
	// errSleep is the time to sleep after we've failed on every metaserver
	// before making another pass
	errSleep = time.Second

	// SaltBytes is the number of bytes used for salts.
	SaltBytes = 32

	// pbkdf2 iter times
	pbkdf2Iter = 4096

	// the length of key generated by pbkdf2
	pbkdf2KeyLen = 32

	// hash algorithm version flag of password save
	hashAlgoVerOne = "#Ver:001#"
	// PBKDF2
	hashAlgoVerTwo = "#Ver:002#"

	maxDbOrRpName = 256

	// ShardGroupDeletedExpiration is the amount of time before a shard group info will be removed from cached
	// data after it has been marked deleted (2 weeks).
	ShardGroupDeletedExpiration = -2 * 7 * 24 * time.Hour
	IndexGroupDeletedExpiration = -2 * 7 * 24 * time.Hour

	RPCReqTimeout       = 10 * time.Second
	HttpReqTimeout      = 10 * time.Second
	HttpSnapshotTimeout = 4 * time.Second

	//for lock user
	maxLoginLimit      = 5    //Maximum number of login attempts
	authFailCacheLimit = 200  //Size of the channel for processing authentication failures.
	lockUserTime       = 30   //User Lock Duration, in seconds.
	maxLoginValidTime  = 3600 //Validity duration of login records, in seconds.

	minUsernameLen = 4   // Minimum username length
	maxUsernameLen = 100 // Maximum username length

	minPasswordLen = 8   // Minimum password length
	maxPasswordLen = 256 // Maximum password length
)

type Role int

const (
	SQL Role = iota
	STORE
	META
)

var (
	ErrNameTooLong          = errors.New("database name must have fewer than 64 characters")
	RetryGetUserInfoTimeout = 5 * time.Second
	RetryExecTimeout        = 60 * time.Second
	RetryReportTimeout      = 60 * time.Second
)

var DefaultTypeMapper = influxql.MultiTypeMapper(
	op.TypeMapper{},
	query.MathTypeMapper{},
	query.FunctionTypeMapper{},
	query.StringFunctionTypeMapper{},
)

var DefaultMetaClient *Client
var cliOnce sync.Once

type StorageNodeInfo struct {
	InsertAddr string
	SelectAddr string
}

type FieldKey struct {
	Field     string
	FieldType int32
}

type FieldKeys []FieldKey

func (a FieldKeys) Len() int           { return len(a) }
func (a FieldKeys) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a FieldKeys) Less(i, j int) bool { return a[i].Field < a[j].Field }

// MetaClient is an interface for accessing meta data.
type MetaClient interface {
	CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, indexR *meta2.IndexRelation) (*meta2.MeasurementInfo, error)
	AlterShardKey(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo) error
	CreateDatabase(name string) (*meta2.DatabaseInfo, error)
	CreateDatabaseWithRetentionPolicy(name string, spec *meta2.RetentionPolicySpec, shardKey *meta2.ShardKeyInfo) (*meta2.DatabaseInfo, error)
	CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error)
	CreateSubscription(database, rp, name, mode string, destinations []string) error
	CreateUser(name, password string, admin, rwuser bool) (meta2.User, error)
	Databases() map[string]*meta2.DatabaseInfo
	Database(name string) (*meta2.DatabaseInfo, error)
	DataNode(id uint64) (*meta2.DataNode, error)
	DataNodes() ([]meta2.DataNode, error)
	DeleteDataNode(id uint64) error
	DeleteMetaNode(id uint64) error
	DropShard(id uint64) error
	DropDatabase(name string) error
	DropRetentionPolicy(database, name string) error
	DropSubscription(database, rp, name string) error
	DropUser(name string) error
	MetaNodes() ([]meta2.NodeInfo, error)
	RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error)
	SetAdminPrivilege(username string, admin bool) error
	SetPrivilege(username, database string, p originql.Privilege) error
	ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta2.ShardInfo, err error)
	ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta2.ShardGroupInfo, err error)
	TruncateShardGroups(t time.Time) error
	UpdateRetentionPolicy(database, name string, rpu *meta2.RetentionPolicyUpdate, makeDefault bool) error
	UpdateUser(name, password string) error
	UserPrivilege(username, database string) (*originql.Privilege, error)
	UserPrivileges(username string) (map[string]originql.Privilege, error)
	Users() []meta2.UserInfo
	MarkDatabaseDelete(name string) error
	MarkRetentionPolicyDelete(database, name string) error
	MarkMeasurementDelete(database, mst string) error
	DBPtView(database string) (meta2.DBPtInfos, error)
	ShardOwner(shardID uint64) (database, policy string, sgi *meta2.ShardGroupInfo)
	Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
	Schema(database string, retentionPolicy string, mst string) (fields map[string]int32, dimensions map[string]struct{}, err error)
	GetMeasurements(m *influxql.Measurement) ([]*meta2.MeasurementInfo, error)
	TagKeys(database string) map[string]set.Set
	FieldKeys(database string, ms influxql.Measurements) (map[string]map[string]int32, error)
	QueryTagKeys(database string, ms influxql.Measurements, cond influxql.Expr) (map[string]map[string]struct{}, error)
	MatchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error)
	Measurements(database string, ms influxql.Measurements) ([]string, error)
	ShowShards() models.Rows
	ShowShardGroups() models.Rows
	ShowSubscriptions() models.Rows
	ShowRetentionPolicies(database string) (models.Rows, error)
	GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int
	NewDownSamplePolicy(database, name string, info *meta2.DownSamplePolicyInfo) error
	DropDownSamplePolicy(database, name string, dropAll bool) error
	ShowDownSamplePolicies(database string) (models.Rows, error)
	GetMstInfoWithInRp(dbName, rpName string, dataTypes []int64) (*meta2.RpMeasurementsFieldsInfo, error)
	AdminUserExists() bool
	Authenticate(username, password string) (u meta2.User, e error)
	UpdateUserInfo()
	UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
	OpenAtStore()
	UpdateStreamMstSchema(database string, retentionPolicy string, mst string, stmt *influxql.SelectStatement) error
	CreateStreamPolicy(info *meta2.StreamInfo) error
	GetStreamInfos() map[string]*meta2.StreamInfo
	GetStreamInfosStore() map[string]*meta2.StreamInfo
	ShowStreams(database string, showAll bool) (models.Rows, error)
	DropStream(name string) error
	GetMeasurementInfoStore(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error)
}

type LoadCtx struct {
	LoadCh    chan *DBPTCtx
	ReportCtx sync.Pool
}

func (ctx *LoadCtx) GetReportCtx() *DBPTCtx {
	v := ctx.ReportCtx.Get()
	if v == nil {
		return &DBPTCtx{}
	}
	return v.(*DBPTCtx)
}

func (ctx *LoadCtx) PutReportCtx(dbPTCtx *DBPTCtx) {
	dbPTCtx.DBPTStat.DB = proto.String("")
	dbPTCtx.DBPTStat.PtID = proto.Uint32(0)
	dbPTCtx.putRpStat(&dbPTCtx.DBPTStat.RpStats)
	ctx.ReportCtx.Put(dbPTCtx)
}

type DBPTCtx struct {
	DBPTStat     *proto2.DBPtStatus
	RpStatusPool sync.Pool
}

func (r *DBPTCtx) GetDBPTStat() *proto2.DBPtStatus {
	if r.DBPTStat == nil {
		r.DBPTStat = &proto2.DBPtStatus{}
	}
	return r.DBPTStat
}

func (r *DBPTCtx) GetRpStat() []*proto2.RpShardStatus {
	v := r.RpStatusPool.Get()
	if v == nil {
		return []*proto2.RpShardStatus{}
	}
	return v.([]*proto2.RpShardStatus)
}

func (r *DBPTCtx) putRpStat(rss *[]*proto2.RpShardStatus) {
	for i := range *rss {
		(*rss)[i].RpName = proto.String("")
		(*rss)[i].ShardStats.ShardID = proto.Uint64(0)
		(*rss)[i].ShardStats.ShardSize = proto.Uint64(0)
		(*rss)[i].ShardStats.SeriesCount = proto.Int32(0)
		(*rss)[i].ShardStats.MaxTime = proto.Int64(0)
	}
	*rss = (*rss)[:0]
	r.RpStatusPool.Put(*rss)
}

func (r *DBPTCtx) String() string {
	if r.DBPTStat == nil {
		return ""
	}

	return r.DBPTStat.String()
}

// Client is used to execute commands on and read data from
// a meta service cluster.
type Client struct {
	tls            bool
	logger         *logger.Logger
	nodeID         uint64
	ShardDurations map[uint64]*meta2.ShardDurationInfo

	mu          sync.RWMutex
	metaServers []string
	closing     chan struct{}
	changed     chan chan struct{}
	cacheData   *meta2.Data

	// Authentication cache.
	authCache map[string]authUser

	weakPwdPath string

	retentionAutoCreate bool

	ShardTier uint64

	// auth fail lock user
	arChan       chan *authRcd
	muAuthData   sync.RWMutex
	authFailRcds map[string]authFailCache
	authSuccRcds map[string]time.Time
}

type authRcd struct {
	user      string
	result    bool // auth result(succ or fail)
	occurTime time.Time
}

type authFailCache struct {
	user         string
	occurTimeLst []time.Time // auth time list
}

type authUser struct {
	bhash string
	salt  []byte
	hash  []byte
}

// NewClient returns a new *Client.
func NewClient(weakPwdPath string, retentionAutoCreate bool, maxConcurrentWriteLimit int) *Client {
	cli := &Client{
		cacheData:           &meta2.Data{},
		closing:             make(chan struct{}),
		changed:             make(chan chan struct{}, maxConcurrentWriteLimit),
		authCache:           make(map[string]authUser),
		weakPwdPath:         weakPwdPath,
		retentionAutoCreate: retentionAutoCreate,
		logger:              logger.NewLogger(errno.ModuleMetaClient).With(zap.String("service", "metaclient")),
		arChan:              make(chan *authRcd, authFailCacheLimit),
		authFailRcds:        make(map[string]authFailCache),
		authSuccRcds:        make(map[string]time.Time),
	}
	cliOnce.Do(func() {
		DefaultMetaClient = cli
	})

	return cli
}

// Open a connection to a meta service cluster.
func (c *Client) Open() error {
	c.cacheData = c.retryUntilSnapshot(SQL, 0)
	go c.pollForUpdates(SQL)

	go c.updateAuthCacheData()
	return nil
}

func (c *Client) OpenAtStore() {
	c.cacheData = c.retryUntilSnapshot(STORE, 0)
	go c.pollForUpdates(STORE)
	go c.verifyDataNodeStatus()
}

// Close the meta service cluster connection.
func (c *Client) Close() error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if t, ok := http.DefaultTransport.(*http.Transport); ok {
		t.CloseIdleConnections()
	}

	select {
	case <-c.closing:
		return nil
	default:
		close(c.closing)
	}

	return nil
}

// NodeID GetNodeID returns the client's node ID.
func (c *Client) NodeID() uint64 { return c.nodeID }

func (c *Client) SetTier(tier string) error {
	c.ShardTier = meta2.StringToTier(strings.ToUpper(tier))
	if c.ShardTier == util.TierBegin {
		return fmt.Errorf("invalid tier %s", tier)
	}
	return nil
}

// SetMetaServers updates the meta servers on the client.
func (c *Client) SetMetaServers(a []string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.metaServers = a

	for i, server := range a {
		transport.NewMetaNodeManager().Add(uint64(i), server)
	}
}

// SetTLS sets whether the client should use TLS when connecting.
// This function is not safe for concurrent use.
func (c *Client) SetTLS(v bool) { c.tls = v }

// Ping will hit the ping endpoint for the metaservice and return nil if
// it returns 200. If checkAllMetaServers is set to true, it will hit the
// ping endpoint and tell it to verify the health of all metaservers in the
// cluster
func (c *Client) Ping(checkAllMetaServers bool) error {
	all := 0
	if checkAllMetaServers {
		all = 1
	}
	callback := &PingCallback{}
	msg := message.NewMetaMessage(message.PingRequestMessage, &message.PingRequest{All: all})
	err := c.SendRPCMsg(0, msg, callback)
	if err != nil {
		return err
	}
	return fmt.Errorf(string(callback.Leader))
}

// ClusterID returns the ID of the cluster it's connected to.
func (c *Client) ClusterID() uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.cacheData.ClusterID
}

// DataNode returns a node by id.
func (c *Client) DataNode(id uint64) (*meta2.DataNode, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	for i := range c.cacheData.DataNodes {
		if c.cacheData.DataNodes[i].ID == id {
			return &c.cacheData.DataNodes[i], nil
		}
	}
	return nil, meta2.ErrNodeNotFound
}

// DataNodes returns the data nodes' info.
func (c *Client) DataNodes() ([]meta2.DataNode, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.DataNodes, nil
}

// CreateDataNode will create a new data node in the metastore
func (c *Client) CreateDataNode(writeHost, queryHost string) (uint64, uint64, error) {
	currentServer := connectedServer
	for {
		// exit if we're closed
		select {
		case <-c.closing:
			return 0, 0, meta2.ErrClientClosed
		default:
			// we're still open, continue on
		}
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()

		node, err := c.getNode(currentServer, writeHost, queryHost)

		if err == nil && node.NodeId > 0 {
			c.nodeID = node.NodeId
			c.ShardDurations = node.ShardDurationInfos
			return c.nodeID, node.LTime, nil
		}

		c.logger.Warn("get node failed", zap.Error(err), zap.String("writeHost", writeHost), zap.String("queryHost", queryHost))
		time.Sleep(errSleep)

		currentServer++
	}
}

// DataNodeByHTTPHost returns the data node with the give http bind address
func (c *Client) DataNodeByHTTPHost(httpAddr string) (*meta2.DataNode, error) {
	nodes, err := c.DataNodes()
	if err != nil {
		return nil, err
	}
	for _, n := range nodes {
		if n.Host == httpAddr {
			return &n, nil
		}
	}

	return nil, meta2.ErrNodeNotFound
}

// DataNodeByTCPHost returns the data node with the give http bind address
func (c *Client) DataNodeByTCPHost(tcpAddr string) (*meta2.DataNode, error) {
	nodes, err := c.DataNodes()
	if err != nil {
		return nil, err
	}
	for _, n := range nodes {
		if n.TCPHost == tcpAddr {
			return &n, nil
		}
	}

	return nil, meta2.ErrNodeNotFound
}

// DeleteDataNode deletes a data node from the cluster.
func (c *Client) DeleteDataNode(id uint64) error {
	cmd := &proto2.DeleteDataNodeCommand{
		ID: proto.Uint64(id),
	}

	return c.retryUntilExec(proto2.Command_DeleteDataNodeCommand, proto2.E_DeleteDataNodeCommand_Command, cmd)
}

// MetaNodes returns the meta nodes' info.
func (c *Client) MetaNodes() ([]meta2.NodeInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.MetaNodes, nil
}

// MetaNodeByAddr returns the meta node's info.
func (c *Client) MetaNodeByAddr(addr string) *meta2.NodeInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()
	for _, n := range c.cacheData.MetaNodes {
		if n.Host == addr {
			return &n
		}
	}
	return nil
}

func (c *Client) FieldKeys(database string, ms influxql.Measurements) (map[string]map[string]int32, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	mis, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	if len(mis) == 0 {
		return nil, nil
	}

	ret := make(map[string]map[string]int32, len(mis))
	for _, m := range mis {
		ret[m.OriginName()] = make(map[string]int32)
		m.FieldKeys(ret)
	}
	return ret, nil
}

func (c *Client) TagKeys(database string) map[string]set.Set {
	c.mu.RLock()
	defer c.mu.RUnlock()
	dbi := c.cacheData.Database(database)
	uniqueMap := make(map[string]set.Set)

	dbi.WalkRetentionPolicy(func(rp *meta2.RetentionPolicyInfo) {
		rp.EachMeasurements(func(mst *meta2.MeasurementInfo) {
			s := set.NewSet()
			for key := range mst.Schema {
				if mst.Schema[key] == influx.Field_Type_Tag {
					s.Add(key)
				}
			}
			_, ok := uniqueMap[mst.Name]
			if ok {
				uniqueMap[mst.Name] = uniqueMap[mst.Name].Union(s)
				return
			}
			uniqueMap[mst.Name] = s
		})
	})

	return uniqueMap
}

func (c *Client) GetMeasurements(m *influxql.Measurement) ([]*meta2.MeasurementInfo, error) {
	var measurements []*meta2.MeasurementInfo
	c.mu.RLock()
	defer c.mu.RUnlock()
	dbi, err := c.cacheData.GetDatabase(m.Database)
	if err != nil {
		return nil, err
	}

	rpi, err := dbi.GetRetentionPolicy(m.RetentionPolicy)
	if err != nil {
		return nil, err
	}

	if m.Regex != nil {
		rpi.EachMeasurements(func(msti *meta2.MeasurementInfo) {
			if m.Regex.Val.Match([]byte(influx.GetOriginMstName(msti.Name))) {
				measurements = append(measurements, msti)
			}
		})
		sort.Slice(measurements, func(i, j int) bool {
			return influx.GetOriginMstName(measurements[i].Name) < influx.GetOriginMstName(measurements[j].Name)
		})
	} else {
		msti, err := rpi.GetMeasurement(m.Name)
		if err != nil {
			return nil, err
		}
		measurements = append(measurements, msti)
	}
	return measurements, nil
}

func (c *Client) Measurement(database string, rpName string, mstName string) (*meta2.MeasurementInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Measurement(database, rpName, mstName)
}

func (c *Client) RetryGetMeasurementInfoStore(database string, rpName string, mstName string) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var info []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, errors.New("GetMeasurementInfoStore fail")
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		info, err = c.getMeasurementInfo(currentServer, database, rpName, mstName)
		if err == nil {
			break
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return info, nil
}

func (c *Client) getMeasurementInfo(currentServer int, database string, rpName string, mstName string) ([]byte, error) {
	callback := &GetMeasurementInfoCallback{}
	msg := message.NewMetaMessage(message.GetMeasurementInfoRequestMessage, &message.GetMeasurementInfoRequest{
		DbName: database, RpName: rpName, MstName: mstName})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		c.logger.Error("GetMeasurementInfoR SendRPCMsg fail", zap.Error(err))
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) GetMeasurementInfoStore(dbName string, rpName string, mstName string) (*meta2.MeasurementInfo, error) {
	val := &proto2.GetMeasurementInfoStoreCommand{
		DbName:  &dbName,
		RpName:  &rpName,
		MstName: &mstName,
	}
	t := proto2.Command_GetMeasurementInfoStoreCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_GetMeasurementInfoStoreCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryGetMeasurementInfoStore(dbName, rpName, mstName)
	if err != nil {
		return nil, err
	}
	mst := &meta2.MeasurementInfo{}
	if err = mst.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return mst, nil
}

// Database returns info for the requested database.
func (c *Client) Database(name string) (*meta2.DatabaseInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.GetDatabase(name)
}

// Databases returns a list of all database infos.
func (c *Client) Databases() map[string]*meta2.DatabaseInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.cacheData.Databases
}

func (c *Client) ShowShards() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowShards()
}

func (c *Client) ShowShardGroups() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowShardGroups()
}

func (c *Client) ShowSubscriptions() models.Rows {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowSubscriptions()
}

func (c *Client) ShowRetentionPolicies(database string) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowRetentionPolicies(database)
}

func (c *Client) Schema(database string, retentionPolicy string, mst string) (fields map[string]int32,
	dimensions map[string]struct{}, err error) {
	fields = make(map[string]int32)
	dimensions = make(map[string]struct{})
	msti, err := c.Measurement(database, retentionPolicy, mst)
	if err != nil {
		return nil, nil, err
	}

	for key := range msti.Schema {
		if msti.Schema[key] == influx.Field_Type_Tag {
			dimensions[key] = struct{}{}
		} else {
			fields[key] = msti.Schema[key]
		}
	}
	return fields, dimensions, nil
}

func (c *Client) UpdateSchema(database string, retentionPolicy string, mst string, fieldToCreate []*proto2.FieldSchema) error {
	cmd := &proto2.UpdateSchemaCommand{
		Database:      proto.String(database),
		RpName:        proto.String(retentionPolicy),
		Measurement:   proto.String(mst),
		FieldToCreate: fieldToCreate,
	}

	err := c.retryUntilExec(proto2.Command_UpdateSchemaCommand, proto2.E_UpdateSchemaCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) CreateMeasurement(database string, retentionPolicy string, mst string, shardKey *meta2.ShardKeyInfo, indexR *meta2.IndexRelation) (*meta2.MeasurementInfo, error) {
	msti, err := c.Measurement(database, retentionPolicy, mst)
	if msti != nil {
		// check shardkey equal or not
		n := len(msti.ShardKeys)
		if n == 0 || !shardKey.EqualsToAnother(&msti.ShardKeys[n-1]) {
			return nil, meta2.ErrMeasurementExists
		}
		return msti, nil
	}
	if err != meta2.ErrMeasurementNotFound {
		return nil, err
	}

	if !meta2.ValidMeasurementName(mst) {
		return nil, errno.NewError(errno.InvalidMeasurement, mst)
	}

	cmd := &proto2.CreateMeasurementCommand{
		DBName: proto.String(database),
		RpName: proto.String(retentionPolicy),
		Name:   proto.String(mst),
	}

	if shardKey != nil {
		cmd.Ski = shardKey.Marshal()
	}

	if indexR != nil {
		if msti == nil {
			indexR.Rid = 0
		} else {
			indexR.Rid = 1
		}
		cmd.IR = indexR.Marshal()
	}
	err = c.retryUntilExec(proto2.Command_CreateMeasurementCommand, proto2.E_CreateMeasurementCommand_Command, cmd)
	if err != nil {
		return nil, err
	}
	return c.Measurement(database, retentionPolicy, mst)
}

func (c *Client) AlterShardKey(database, retentionPolicy, mst string, shardKey *meta2.ShardKeyInfo) error {
	_, err := c.Measurement(database, retentionPolicy, mst)
	if err != nil {
		return err
	}

	cmd := &proto2.AlterShardKeyCmd{
		DBName: proto.String(database),
		RpName: proto.String(retentionPolicy),
		Name:   proto.String(mst),
		Ski:    shardKey.Marshal(),
	}

	return c.retryUntilExec(proto2.Command_AlterShardKeyCmd, proto2.E_AlterShardKeyCmd_Command, cmd)
}

// CreateDatabase creates a database or returns it if it already exists.
func (c *Client) CreateDatabase(name string) (*meta2.DatabaseInfo, error) {
	if strings.Count(name, "") > maxDbOrRpName {
		return nil, ErrNameTooLong
	}
	db, err := c.Database(name)
	if db != nil || !errno.Equal(err, errno.DatabaseNotFound) {
		return db, err
	}

	cmd := &proto2.CreateDatabaseCommand{
		Name: proto.String(name),
	}

	err = c.retryUntilExec(proto2.Command_CreateDatabaseCommand, proto2.E_CreateDatabaseCommand_Command, cmd)
	if err != nil {
		return nil, err
	}

	return c.Database(name)
}

// CreateDatabaseWithRetentionPolicy creates a database with the specified
// retention policy.
//
// When creating a database with a retention policy, the retention policy will
// always be set to default. Therefore if the caller provides a retention policy
// that already exists on the database, but that retention policy is not the
// default one, an error will be returned.
//
// This call is only idempotent when the caller provides the exact same
// retention policy, and that retention policy is already the default for the
// database.
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *meta2.RetentionPolicySpec, shardKey *meta2.ShardKeyInfo) (*meta2.DatabaseInfo, error) {
	if spec == nil {
		return nil, errors.New("CreateDatabaseWithRetentionPolicy called with nil spec")
	}

	rpi := spec.NewRetentionPolicyInfo()
	if err := rpi.CheckSpecValid(); err != nil {
		return nil, err
	}

	db, err := c.Database(name)
	if err != nil && !errno.Equal(err, errno.DatabaseNotFound) {
		return nil, err
	}

	if db != nil {
		if !db.ShardKey.EqualsToAnother(shardKey) {
			return nil, errno.NewError(errno.ShardKeyConflict)
		}
		if rp := db.RetentionPolicy(rpi.Name); rp != nil {
			if !rp.EqualsAnotherRp(rpi) {
				return nil, meta2.ErrRetentionPolicyConflict
			}
			return db, nil
		}
	}

	cmd := &proto2.CreateDatabaseCommand{
		Name:            proto.String(name),
		RetentionPolicy: rpi.Marshal(),
	}

	if len(shardKey.ShardKey) > 0 {
		cmd.Ski = shardKey.Marshal()
	}

	err = c.retryUntilExec(proto2.Command_CreateDatabaseCommand, proto2.E_CreateDatabaseCommand_Command, cmd)
	if err != nil {
		return nil, err
	}

	return c.Database(name)
}

func (c *Client) MarkMeasurementDelete(database, measurement string) error {
	dbi, err := c.Database(database)
	if err != nil {
		return err
	}
	if dbi == nil {
		return nil
	}

	var policy string
	for _, rp := range dbi.RetentionPolicies {
		msti := rp.Measurement(measurement)
		if msti == nil {
			continue
		}
		if msti.MarkDeleted {
			return nil
		}
		policy = rp.Name
		break
	}
	cmd := &proto2.MarkMeasurementDeleteCommand{
		Database:    proto.String(database),
		Policy:      proto.String(policy),
		Measurement: proto.String(measurement),
	}

	return c.retryUntilExec(proto2.Command_MarkMeasurementDeleteCommand, proto2.E_MarkMeasurementDeleteCommand_Command, cmd)
}

func (c *Client) MarkDatabaseDelete(name string) error {
	cmd := &proto2.MarkDatabaseDeleteCommand{
		Name: proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_MarkDatabaseDeleteCommand, proto2.E_MarkDatabaseDeleteCommand_Command, cmd)
}

func (c *Client) MarkRetentionPolicyDelete(database, name string) error {
	cmd := &proto2.MarkRetentionPolicyDeleteCommand{
		Database: proto.String(database),
		Name:     proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_MarkRetentionPolicyDeleteCommand, proto2.E_MarkRetentionPolicyDeleteCommand_Command, cmd)
}

// DropDatabase deletes a database.
func (c *Client) DropDatabase(name string) error {
	cmd := &proto2.DropDatabaseCommand{
		Name: proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_DropDatabaseCommand, proto2.E_DropDatabaseCommand_Command, cmd)
}

// CreateRetentionPolicy creates a retention policy on the specified database.
func (c *Client) CreateRetentionPolicy(database string, spec *meta2.RetentionPolicySpec, makeDefault bool) (*meta2.RetentionPolicyInfo, error) {

	if spec.Duration != nil && *spec.Duration < meta2.MinRetentionPolicyDuration && *spec.Duration != 0 {
		return nil, meta2.ErrRetentionPolicyDurationTooLow
	}

	rpi := spec.NewRetentionPolicyInfo()
	if strings.Count(rpi.Name, "") > maxDbOrRpName {
		return nil, ErrNameTooLong
	}
	cmd := &proto2.CreateRetentionPolicyCommand{
		Database:        proto.String(database),
		RetentionPolicy: rpi.Marshal(),
		DefaultRP:       proto.Bool(makeDefault),
	}

	if err := c.retryUntilExec(proto2.Command_CreateRetentionPolicyCommand, proto2.E_CreateRetentionPolicyCommand_Command, cmd); err != nil {
		return nil, err
	}

	return c.RetentionPolicy(database, rpi.Name)
}

// RetentionPolicy returns the requested retention policy info.
func (c *Client) RetentionPolicy(database, name string) (rpi *meta2.RetentionPolicyInfo, err error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	db := c.cacheData.Database(database)
	if db == nil || db.MarkDeleted {
		return nil, errno.NewError(errno.DatabaseNotFound, database)
	}

	return db.RetentionPolicy(name), nil
}

func (c *Client) DBPtView(database string) (meta2.DBPtInfos, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	pts := c.cacheData.DBPtView(database)
	if pts == nil {
		return nil, errno.NewError(errno.DatabaseNotFound, database)
	}

	return pts, nil
}

// DropRetentionPolicy drops a retention policy from a database.
func (c *Client) DropRetentionPolicy(database, name string) error {
	cmd := &proto2.DropRetentionPolicyCommand{
		Database: proto.String(database),
		Name:     proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_DropRetentionPolicyCommand, proto2.E_DropRetentionPolicyCommand_Command, cmd)
}

// SetDefaultRetentionPolicy sets a database's default retention policy.
func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
	cmd := &proto2.SetDefaultRetentionPolicyCommand{
		Database: proto.String(database),
		Name:     proto.String(name),
	}

	return c.retryUntilExec(proto2.Command_SetDefaultRetentionPolicyCommand, proto2.E_SetDefaultRetentionPolicyCommand_Command, cmd)
}

// UpdateRetentionPolicy updates a retention policy.
func (c *Client) UpdateRetentionPolicy(database, name string, rpu *meta2.RetentionPolicyUpdate, makeDefault bool) error {
	var newName *string
	if rpu.Name != nil {
		newName = rpu.Name
	}

	var replicaN *uint32
	if rpu.ReplicaN != nil {
		value := uint32(*rpu.ReplicaN)
		replicaN = &value
	}

	cmd := &proto2.UpdateRetentionPolicyCommand{
		Database:           proto.String(database),
		Name:               proto.String(name),
		NewName:            newName,
		Duration:           meta2.GetInt64Duration(rpu.Duration),
		ReplicaN:           replicaN,
		ShardGroupDuration: meta2.GetInt64Duration(rpu.ShardGroupDuration),
		MakeDefault:        proto.Bool(makeDefault),
		HotDuration:        meta2.GetInt64Duration(rpu.HotDuration),
		WarmDuration:       meta2.GetInt64Duration(rpu.WarmDuration),
		IndexGroupDuration: meta2.GetInt64Duration(rpu.IndexGroupDuration),
	}

	return c.retryUntilExec(proto2.Command_UpdateRetentionPolicyCommand, proto2.E_UpdateRetentionPolicyCommand_Command, cmd)
}

// IsLeader - should get rid of this
func (c *Client) IsLeader() bool {
	return false
}

// Users returns a slice of UserInfo representing the currently known users.
func (c *Client) Users() []meta2.UserInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()

	users := c.cacheData.Users

	if users == nil {
		return []meta2.UserInfo{}
	}
	return users
}

// User returns the user with the given name, or ErrUserNotFound.
func (c *Client) User(name string) (meta2.User, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	for _, u := range c.cacheData.Users {
		if u.Name == name {
			return &u, nil
		}
	}

	return nil, meta2.ErrUserNotFound
}

// Encrypt the plaintext by different hash version with giving salt
func (c *Client) encryptWithSalt(salt []byte, plaintext string, hashVer string) []byte {
	switch hashVer {
	case hashAlgoVerOne:
		return c.hashWithSalt(salt, plaintext)
	case hashAlgoVerTwo:
		return c.pbkdf2WithSalt(salt, plaintext)
	default:
		return nil
	}
}

// Encrypt the plaintext by different hash version
func (c *Client) saltedEncryptByVer(plaintext string, hashVer string) (salt, hash []byte, err error) {
	switch hashVer {
	case hashAlgoVerOne:
		return c.saltedHash(plaintext)
	case hashAlgoVerTwo:
		return c.saltedPbkdf2(plaintext)
	default:
		return nil, nil, meta2.ErrUnsupportedVer
	}
}

// hashWithSalt returns a salted hash of password using salt.
func (c *Client) hashWithSalt(salt []byte, password string) []byte {
	hasher := sha256.New()
	_, err := hasher.Write(salt)
	if err != nil {
		return nil
	}
	_, err = hasher.Write([]byte(password))
	if err != nil {
		return nil
	}
	return hasher.Sum(nil)
}

// saltedHash returns a salt and salted hash of password.
func (c *Client) saltedHash(password string) (salt, hash []byte, err error) {
	salt = make([]byte, SaltBytes)
	if _, err := io.ReadFull(crand.Reader, salt); err != nil {
		return nil, nil, err
	}

	return salt, c.hashWithSalt(salt, password), nil
}

// generates the salted hash value of the password (using SHA256).
func (c *Client) genHashPwdVal(password string) (string, error) {
	// 1.generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedHash(password)
	if err != nil {
		c.logger.Error("saltedHash fail", zap.Error(err))
		return "", err
	}

	// 2.assemble (verFlag + salt + hashedVal)
	rstVal := hashAlgoVerOne
	rstVal += fmt.Sprintf("%02X", salt)   // convert to  hex string
	rstVal += fmt.Sprintf("%02X", hashed) // convert to hex string
	return rstVal, nil
}

// pbkdf2WithSalt returns an encryption of password using salt.
func (c *Client) pbkdf2WithSalt(salt []byte, password string) []byte {
	dk := pbkdf2.Key([]byte(password), salt, pbkdf2Iter, pbkdf2KeyLen, sha256.New)
	return dk
}

// saltedPbkdf2 returns a salt and used pbkdf2 of password.
func (c *Client) saltedPbkdf2(password string) (salt, hash []byte, err error) {
	salt = make([]byte, SaltBytes)
	if _, err := io.ReadFull(crand.Reader, salt); err != nil {
		return nil, nil, err
	}

	return salt, c.pbkdf2WithSalt(salt, password), nil
}

// generates the salted hash value of the password (using PBKDF2)
func (c *Client) genPbkdf2PwdVal(password string) (string, error) {
	// 1.generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedPbkdf2(password)
	if err != nil {
		c.logger.Error("saltedHash fail", zap.Error(err))
		return "", err
	}

	// 2.assemble (verFlag + salt + hashedVal(PBKDF2))
	rstVal := hashAlgoVerTwo
	rstVal += fmt.Sprintf("%02X", salt)   // convert to  hex string
	rstVal += fmt.Sprintf("%02X", hashed) // convert to hex string
	return rstVal, nil
}

// for hash ver One
func (c *Client) compareHashAndPwdVerOne(hashed, plaintext string) error {
	if len(hashed) < len(hashAlgoVerOne)+SaltBytes*2 {
		return meta2.ErrHashedLength
	}
	verFlagLen := len(hashAlgoVerOne)
	saltStr := hashed[verFlagLen : verFlagLen+SaltBytes*2]
	hashStr := hashed[verFlagLen+SaltBytes*2:]

	salt, err := func(s string) ([]byte, error) {
		rstSlice := make([]byte, len(s)/2)
		for i := 0; i+1 < len(s); {
			uVal, err := strconv.ParseUint(s[i:i+2], 16, 8) //16 base, 8 bitSize
			if err != nil {
				c.logger.Error("hash pwd VerOne convert str to uint fail", zap.Error(err))
				return nil, err
			}
			rstSlice[i/2] = byte(uVal & 0xFF)
			i += 2
		}
		return rstSlice, nil
	}(saltStr)

	if err != nil {
		return err
	}

	// gen hasded strVal from given plain pwd
	newHashStr := fmt.Sprintf("%02X", c.hashWithSalt(salt, plaintext))

	if hashStr != newHashStr {
		return meta2.ErrMismatchedHashAndPwd
	}
	return nil
}

// for hash ver Two
func (c *Client) compareHashAndPwdVerTwo(hashed, plaintext string) error {
	if len(hashed) < len(hashAlgoVerTwo)+SaltBytes*2 {
		return meta2.ErrHashedLength
	}
	verFlagLen := len(hashAlgoVerTwo)
	saltStr := hashed[verFlagLen : verFlagLen+SaltBytes*2]
	hashStr := hashed[verFlagLen+SaltBytes*2:]

	salt, err := func(s string) ([]byte, error) {
		rstSlice := make([]byte, len(s)/2)
		for i := 0; i+1 < len(s); {
			uVal, err := strconv.ParseUint(s[i:i+2], 16, 8) //16 base, 8 bitSize
			if err != nil {
				c.logger.Error("hash pwd VerTwo convert str to uint fail", zap.Error(err))
				return nil, err
			}
			rstSlice[i/2] = byte(uVal & 0xFF)
			i += 2
		}
		return rstSlice, nil
	}(saltStr)

	if err != nil {
		return err
	}

	// gen pbkdf2 hashed strVal from given plain pwd
	dk := c.pbkdf2WithSalt(salt, plaintext)
	newHashStr := fmt.Sprintf("%02X", dk)

	if hashStr != newHashStr {
		return meta2.ErrMismatchedHashAndPwd
	}
	return nil
}

// compares a hashed password with its possible
// plaintext equivalent. Returns nil on success, or an error on failure.
func (c *Client) CompareHashAndPlainPwd(hashed, plaintext string) error {
	if len(hashed) < len(hashAlgoVerOne) {
		return meta2.ErrHashedLength
	}

	hashVer := hashed[:len(hashAlgoVerOne)]
	switch hashVer {
	case hashAlgoVerOne:
		return c.compareHashAndPwdVerOne(hashed, plaintext)
	case hashAlgoVerTwo:
		return c.compareHashAndPwdVerTwo(hashed, plaintext)
	default:
		return meta2.ErrMismatchedHashAndPwd
	}
}

// CreateUser adds a user with the given name and password and admin status.
func (c *Client) CreateUser(name, password string, admin, rwuser bool) (meta2.User, error) {
	// verify name length
	if err := c.isValidName(name); err != nil {
		return nil, err
	}
	// verify password
	if err := c.isValidPwd(password, name); err != nil {
		return nil, err
	}

	data := c.cacheData.Clone()

	// See if the user already exists.
	if u := data.GetUser(name); u != nil {
		if err := c.CompareHashAndPlainPwd(u.Hash, password); err != nil || u.Admin != admin {
			return nil, meta2.ErrUserExists
		}
		return u, nil
	}

	// Forbidden create multi admin user
	if admin && data.HasAdminUser() {
		return nil, meta2.ErrUserForbidden
	}

	// Hash the password before serializing it.
	hash, err := c.genPbkdf2PwdVal(password)
	if err != nil {
		return nil, err
	}

	if err := c.retryUntilExec(proto2.Command_CreateUserCommand, proto2.E_CreateUserCommand_Command,
		&proto2.CreateUserCommand{
			Name:   proto.String(name),
			Hash:   proto.String(hash),
			Admin:  proto.Bool(admin),
			RwUser: proto.Bool(rwuser),
		},
	); err != nil {
		return nil, err
	}
	return c.User(name)
}

// UpdateUser updates the password of an existing user.
func (c *Client) UpdateUser(name, password string) error {
	// verify password
	if err := c.isValidPwd(password, name); err != nil {
		return err
	}

	// Hash the password before serializing it.
	hash, err := c.genPbkdf2PwdVal(password)
	if err != nil {
		return err
	}

	if u := c.cacheData.GetUser(name); u == nil {
		return meta2.ErrUserNotFound
	} else {
		if err = c.CompareHashAndPlainPwd(u.Hash, password); err == nil {
			return meta2.ErrPwdUsed
		}
	}
	return c.retryUntilExec(proto2.Command_UpdateUserCommand, proto2.E_UpdateUserCommand_Command,
		&proto2.UpdateUserCommand{
			Name: proto.String(name),
			Hash: proto.String(hash),
		},
	)
}

func (c *Client) isValidName(user string) error {
	if len(user) < minUsernameLen || len(user) > maxUsernameLen {
		return errno.NewError(errno.InvalidUsernameLen, minUsernameLen, maxUsernameLen)
	}
	return nil
}

func (c *Client) isValidPwd(s, user string) error {
	if len(s) < minPasswordLen || len(s) > maxPasswordLen {
		return errno.NewError(errno.InvalidPwdLen, minPasswordLen, maxPasswordLen)
	}

	if ok, _ := c.isInWeakPwdDict(s); ok {
		return errno.NewError(errno.InvalidPwdComplex)
	}

	if c.isSimilarToUserName(s, user) {
		return errno.NewError(errno.InvalidPwdLooks)
	}

	if !c.isStrengthPwd(s) {
		return errno.NewError(errno.InvalidPwdComplex)
	}

	return nil
}

func (c *Client) isSimilarToUserName(s, user string) bool {
	if len(s) != len(user) {
		return false
	}

	//pwd same as user name
	if s == user {
		return true
	}

	//pwd same as reverse(user name)
	slen := len(s)
	for i := 0; i < slen; i++ {
		if s[i] != user[slen-1-i] {
			return false
		}
	}
	return true
}

func (c *Client) isInWeakPwdDict(s string) (bool, error) {
	filename := c.weakPwdPath
	content, err := ioutil.ReadFile(path.Clean(filename))
	if err != nil {
		return false, err
	}
	dec := unicode.BOMOverride(transform.Nop)
	content, _, err = transform.Bytes(dec, content)
	if err != nil {
		return false, err
	}
	weakPwdDict := strings.Split(string(content), "\r\n")
	for _, v := range weakPwdDict {
		if v == s {
			return true, nil
		}
	}
	return false, nil
}

func (c *Client) isStrengthPwd(s string) bool {
	var specChars = "-~@$#%_^!*+=?"
	var lowerCh, upperCh, digitCh, specCh bool
	for i := 0; i < len(s); i++ {
		ch := s[i]
		switch {
		case 'a' <= ch && ch <= 'z':
			lowerCh = true
		case 'A' <= ch && ch <= 'Z':
			upperCh = true
		case '0' <= ch && ch <= '9':
			digitCh = true
		case strings.Contains(specChars, string(ch)):
			specCh = true
		default:
			c.logger.Info("verify pwd strength:", zap.Any("unsupported char", string(ch)))
			return false
		}
	}
	c.logger.Info("verify pwd strength:", zap.Any("lower", lowerCh), zap.Any("upper", upperCh),
		zap.Any("digit", digitCh), zap.Any("spec", specCh))
	return lowerCh && upperCh && digitCh && specCh
}

// DropUser removes the user with the given name.
func (c *Client) DropUser(name string) error {
	if u, err := c.User(name); err != nil {
		return err
	} else {
		if u.AuthorizeUnrestricted() {
			return meta2.ErrUserDropSelf
		}
	}
	return c.retryUntilExec(proto2.Command_DropUserCommand, proto2.E_DropUserCommand_Command,
		&proto2.DropUserCommand{
			Name: proto.String(name),
		},
	)
}

// SetPrivilege sets a privilege for the given user on the given database.
func (c *Client) SetPrivilege(username, database string, p originql.Privilege) error {
	return c.retryUntilExec(proto2.Command_SetPrivilegeCommand, proto2.E_SetPrivilegeCommand_Command,
		&proto2.SetPrivilegeCommand{
			Username:  proto.String(username),
			Database:  proto.String(database),
			Privilege: proto.Int32(int32(p)),
		},
	)
}

// SetAdminPrivilege sets or unsets admin privilege to the given username.
func (c *Client) SetAdminPrivilege(username string, admin bool) error {
	return c.retryUntilExec(proto2.Command_SetAdminPrivilegeCommand, proto2.E_SetAdminPrivilegeCommand_Command,
		&proto2.SetAdminPrivilegeCommand{
			Username: proto.String(username),
			Admin:    proto.Bool(admin),
		},
	)
}

// UserPrivileges returns the privileges for a user mapped by database name.
func (c *Client) UserPrivileges(username string) (map[string]originql.Privilege, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	p, err := c.cacheData.UserPrivileges(username)
	if err != nil {
		return nil, err
	}
	return p, nil
}

// UserPrivilege returns the privilege for the given user on the given database.
func (c *Client) UserPrivilege(username, database string) (*originql.Privilege, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	p, err := c.cacheData.UserPrivilege(username, database)
	if err != nil {
		return nil, err
	}
	return p, nil
}

// AdminUserExists returns true if any user has admin privilege.
func (c *Client) AdminUserExists() bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.AdminUserExist()
}

func (c *Client) updateAuthCacheData() {
	ticker := time.NewTicker(1 * time.Hour)
	defer ticker.Stop()
	lockTicker := time.NewTicker(500 * time.Millisecond)
	defer lockTicker.Stop()

	for {
		select {
		case rcd := <-c.arChan:
			c.dealOnceAuthRecord(rcd)
		case <-lockTicker.C:
			c.updateUserLockData()
		case <-ticker.C:
			c.clearInvalidAuthData()
		case <-c.closing:
			return
		}
	}
}

func (c *Client) updateUserLockData() {
	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()

	// clear expired locks (unlock users)
	var unlockLst []string
	for k, v := range c.authFailRcds {
		if len(v.occurTimeLst) >= maxLoginLimit {
			if time.Since(v.occurTimeLst[maxLoginLimit-1]).Seconds() >= lockUserTime {
				unlockLst = append(unlockLst, k)
			}
		}
	}

	for _, u := range unlockLst {
		c.logger.Info("unlock user", zap.String("user", u))
		delete(c.authFailRcds, u)
	}
}

func (c *Client) dealOncAuthSuccRcd(r *authRcd) {
	if !r.result {
		return
	}

	// discard invalid authSuccRcd
	if v, ok := c.authSuccRcds[r.user]; ok {
		if v.After(r.occurTime) {
			return
		}
	}
	// record the latest successful authentication
	c.authSuccRcds[r.user] = r.occurTime

	// refresh authFailRcds, clear invalid auth fail rcd.
	if v, ok := c.authFailRcds[r.user]; ok {
		var tmp []time.Time
		existInvalidRcd := false
		for _, tv := range v.occurTimeLst {
			if r.occurTime.After(tv) {
				existInvalidRcd = true
				continue
			}
			tmp = append(tmp, tv)
		}
		if !existInvalidRcd {
			return
		}
		if len(tmp) == 0 {
			delete(c.authFailRcds, r.user)
			return
		}
		v.occurTimeLst = tmp
		c.authFailRcds[r.user] = v
	}
}

func (c *Client) dealOncAuthFailRcd(r *authRcd) {
	if r.result {
		return
	}

	// discard invalid authFailRcd
	if v, ok := c.authSuccRcds[r.user]; ok {
		if v.After(r.occurTime) {
			return
		}
	}

	// refresh auth_Fail_Cache data
	if v, ok := c.authFailRcds[r.user]; ok {
		v.occurTimeLst = append(v.occurTimeLst, r.occurTime)
		sort.Slice(v.occurTimeLst, func(i, j int) bool {
			return v.occurTimeLst[j].After(v.occurTimeLst[i])
		})
		if len(v.occurTimeLst) > maxLoginLimit {
			v.occurTimeLst = v.occurTimeLst[0:maxLoginLimit]
		}
		c.authFailRcds[r.user] = v
	} else {
		c.authFailRcds[r.user] = authFailCache{
			user: r.user, occurTimeLst: []time.Time{r.occurTime}}
	}
	c.logger.Info(r.user, zap.Any("auth fail count", len(c.authFailRcds[r.user].occurTimeLst)))
}

func (c *Client) dealOnceAuthRecord(r *authRcd) {
	const maxDealLimit = 20
	realNum := len(c.arChan)
	if realNum > maxDealLimit-1 {
		realNum = maxDealLimit - 1
	}

	rcdLst := []*authRcd{r}
	for i := 1; i < realNum; i++ {
		select {
		case rcd := <-c.arChan:
			rcdLst = append(rcdLst, rcd)
		default:
			break
		}
	}

	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()
	for _, rcd := range rcdLst {
		c.dealOncAuthSuccRcd(rcd)
		c.dealOncAuthFailRcd(rcd)
	}
}

func (c *Client) clearInvalidAuthData() {
	c.muAuthData.Lock()
	defer c.muAuthData.Unlock()

	// clear invalid auth rcd
	var userLst []string
	for k, v := range c.authFailRcds {
		if time.Since(v.occurTimeLst[len(v.occurTimeLst)-1]).Seconds() >= maxLoginValidTime {
			c.logger.Info("delete invalid authFailrcd", zap.String("user", k), zap.Any("time data", v.occurTimeLst))
			userLst = append(userLst, k)
		}
	}
	for _, u := range userLst {
		delete(c.authFailRcds, u)
	}

	userLst = []string{}
	for k, v := range c.authSuccRcds {
		if time.Since(v).Seconds() >= maxLoginValidTime {
			c.logger.Info("delete invalid asc rcd", zap.String("user", k), zap.Any("time", v))
			userLst = append(userLst, k)
		}
	}
	for _, u := range userLst {
		delete(c.authSuccRcds, u)
	}
}

func (c *Client) isLockedUser(u string) bool {
	c.muAuthData.RLock()
	defer c.muAuthData.RUnlock()
	if v, ok := c.authFailRcds[u]; ok {
		if len(v.occurTimeLst) >= maxLoginLimit {
			c.logger.Info("The user has been locked.", zap.String("user", u))
			return true
		}
	}
	return false
}

// Authenticate returns a UserInfo if the username and password match an existing entry.
func (c *Client) Authenticate(username, password string) (u meta2.User, e error) {
	//verify user lock or not
	if c.isLockedUser(username) {
		return nil, meta2.ErrUserLocked
	}
	//record auth, refresh cache
	defer func() {
		rst := e == nil
		c.arChan <- &authRcd{user: username, result: rst, occurTime: time.Now()}
	}()

	// Find user.
	c.mu.RLock()
	userInfo := c.cacheData.GetUser(username)
	c.mu.RUnlock()
	if userInfo == nil {
		return nil, meta2.ErrUserNotFound
	}
	hashVer := userInfo.Hash[:len(hashAlgoVerOne)]

	// Check the local auth cache first.
	c.mu.RLock()
	au, ok := c.authCache[username]
	c.mu.RUnlock()
	if ok {
		// verify the password using the cached salt and hash
		if bytes.Equal(c.encryptWithSalt(au.salt, password, hashVer), au.hash) {
			return userInfo, nil
		}

		// fall through to requiring a full bcrypt hash for invalid passwords
	}

	// Compare password with user hash.
	if err := c.CompareHashAndPlainPwd(userInfo.Hash, password); err != nil {
		return nil, meta2.ErrAuthenticate
	}

	// generate a salt and hash of the password for the cache
	salt, hashed, err := c.saltedEncryptByVer(password, hashVer)
	if err != nil {
		return nil, err
	}
	c.mu.Lock()
	c.authCache[username] = authUser{salt: salt, hash: hashed, bhash: userInfo.Hash}
	c.mu.Unlock()
	return userInfo, nil
}

// UserCount returns the number of users stored.
func (c *Client) UserCount() int {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return len(c.cacheData.Users)
}

// ShardIDs returns a list of all shard ids.
func (c *Client) ShardIDs() []uint64 {
	c.mu.RLock()

	var a []uint64
	for _, dbi := range c.cacheData.Databases {
		for _, rpi := range dbi.RetentionPolicies {
			for _, sgi := range rpi.ShardGroups {
				for _, si := range sgi.Shards {
					a = append(a, si.ID)
				}
			}
		}
	}
	c.mu.RUnlock()
	sort.Sort(uint64Slice(a))
	return a
}

// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
// for the specified time range. Shard groups are sorted by start time.
func (c *Client) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta2.ShardGroupInfo, err error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	// Find retention policy.
	rpi, err := c.cacheData.RetentionPolicy(database, policy)
	if err != nil {
		return nil, err
	} else if rpi == nil {
		return nil, meta2.ErrRetentionPolicyNotFound(policy)
	}
	groups := make([]meta2.ShardGroupInfo, 0, len(rpi.ShardGroups))
	for _, g := range rpi.ShardGroups {
		if g.Deleted() || !g.Overlaps(min, max) {
			continue
		}
		groups = append(groups, g)
	}
	return groups, nil
}

// ShardsByTimeRange returns a slice of shards that may contain data in the time range.
func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta2.ShardInfo, err error) {
	m := make(map[*meta2.ShardInfo]struct{})
	for _, mm := range sources.Measurements() {
		groups, err := c.ShardGroupsByTimeRange(mm.Database, mm.RetentionPolicy, tmin, tmax)
		if err != nil {
			return nil, err
		}
		for _, g := range groups {
			for i := range g.Shards {
				m[&g.Shards[i]] = struct{}{}
			}
		}
	}

	a = make([]meta2.ShardInfo, 0, len(m))
	for sh := range m {
		a = append(a, *sh)
	}

	return a, nil
}

// DropShard deletes a shard by ID.
func (c *Client) DropShard(id uint64) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	data := c.cacheData.Clone()
	data.DropShard(id)
	return nil
}

// TruncateShardGroups truncates any shard group that could contain timestamps beyond t.
func (c *Client) TruncateShardGroups(t time.Time) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	data := c.cacheData.Clone()
	data.TruncateShardGroups(t)
	return nil
}

// PruneShardGroups remove deleted shard groups from the data store.
func (c *Client) PruneShardGroups() error {
	expiration := time.Now().Add(ShardGroupDeletedExpiration)
	c.mu.Lock()
	defer c.mu.Unlock()
	data := c.cacheData.Clone()
	for i, d := range data.Databases {
		for j, rp := range d.RetentionPolicies {
			var remainingShardGroups []meta2.ShardGroupInfo
			for _, sgi := range rp.ShardGroups {
				if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) {
					remainingShardGroups = append(remainingShardGroups, sgi)
					continue
				}
			}
			data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
		}
	}
	return nil
}

// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*meta2.ShardGroupInfo, error) {
	c.mu.RLock()
	sg, tier, err := c.cacheData.GetTierOfShardGroup(database, policy, timestamp, c.ShardTier)
	if err != nil {
		c.mu.RUnlock()
		return nil, err
	}
	if sg != nil {
		c.mu.RUnlock()
		return sg, nil
	}
	c.mu.RUnlock()

	cmd := &proto2.CreateShardGroupCommand{
		Database:  proto.String(database),
		Policy:    proto.String(policy),
		Timestamp: proto.Int64(timestamp.UnixNano()),
		ShardTier: proto.Uint64(tier),
	}

	if err := c.retryUntilExec(proto2.Command_CreateShardGroupCommand, proto2.E_CreateShardGroupCommand_Command, cmd); err != nil {
		return nil, err
	}

	rpi, err := c.RetentionPolicy(database, policy)
	if err != nil {
		return nil, err
	} else if rpi == nil || rpi.MarkDeleted {
		return nil, errors.New("retention policy deleted after shard group created")
	}

	return rpi.ShardGroupByTimestamp(timestamp), nil
}

func (c *Client) GetAliveShards(database string, sgi *meta2.ShardGroupInfo) []int {
	c.mu.RLock()
	aliveShardIdxes := make([]int, 0, c.cacheData.ClusterPtNum)
	for i := range sgi.Shards {
		if c.cacheData.PtView[database][sgi.Shards[i].Owners[0]].Status == meta2.Online {
			aliveShardIdxes = append(aliveShardIdxes, i)
		}
	}
	c.mu.RUnlock()
	return aliveShardIdxes
}

func (c *Client) DeleteIndexGroup(database, policy string, id uint64) error {
	cmd := &proto2.DeleteIndexGroupCommand{
		Database:     proto.String(database),
		Policy:       proto.String(policy),
		IndexGroupID: proto.Uint64(id),
	}
	_, err := c.retryExec(proto2.Command_DeleteIndexGroupCommand, proto2.E_DeleteIndexGroupCommand_Command, cmd)
	return err
}

// DeleteShardGroup removes a shard group from a database and retention policy by id.
func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
	cmd := &proto2.DeleteShardGroupCommand{
		Database:     proto.String(database),
		Policy:       proto.String(policy),
		ShardGroupID: proto.Uint64(id),
	}

	_, err := c.retryExec(proto2.Command_DeleteShardGroupCommand, proto2.E_DeleteShardGroupCommand_Command, cmd)
	return err
}

// PyStore send command to PyMeta. NO need to waitForIndex.
func (c *Client) PruneGroupsCommand(shardGroup bool, id uint64) error {
	cmd := &proto2.PruneGroupsCommand{
		ShardGroup: proto.Bool(shardGroup),
		ID:         proto.Uint64(id),
	}
	_, err := c.retryExec(proto2.Command_PruneGroupsCommand, proto2.E_PruneGroupsCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) UpdateShardInfoTier(shardID uint64, tier uint64, dbName, rpName string) error {
	cmd := &proto2.UpdateShardInfoTierCommand{
		ShardID: proto.Uint64(shardID),
		Tier:    proto.Uint64(tier),
		DbName:  proto.String(dbName),
		RpName:  proto.String(rpName),
	}
	_, err := c.retryExec(proto2.Command_UpdateShardInfoTierCommand, proto2.E_UpdateShardInfoTierCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

// ShardOwner returns the owning shard group info for a specific shard.
func (c *Client) ShardOwner(shardID uint64) (database, policy string, sgi *meta2.ShardGroupInfo) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	for dbIdx := range c.cacheData.Databases {
		rps := c.cacheData.Databases[dbIdx].RetentionPolicies
		for rpIdx := range rps {
			for sgIdx := range rps[rpIdx].ShardGroups {
				sg := &rps[rpIdx].ShardGroups[sgIdx]
				if sg.Deleted() {
					continue
				}
				for shIdx := range sg.Shards {
					if sg.Shards[shIdx].ID == shardID {
						database = c.cacheData.Databases[dbIdx].Name
						policy = rps[rpIdx].Name
						sgi = sg
						return
					}
				}
			}
		}
	}
	return
}

// JoinMetaServer will add the passed in tcpAddr to the raft peers and add a MetaNode to
// the metastore
func (c *Client) JoinMetaServer(httpAddr, rpcAddr, tcpAddr string) (*meta2.NodeInfo, error) {
	node := &meta2.NodeInfo{
		Host:    httpAddr,
		RPCAddr: rpcAddr,
		TCPHost: tcpAddr,
	}
	b, err := json.Marshal(node)
	if err != nil {
		return nil, err
	}

	currentServer := 0
	for {
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			// We've tried every server, wait a second before
			// trying again
			time.Sleep(errSleep)
			currentServer = 0
		}
		c.mu.RUnlock()

		callback := &JoinCallback{NodeInfo: node}
		msg := message.NewMetaMessage(message.UpdateRequestMessage, &message.UpdateRequest{Body: b})
		err = c.SendRPCMsg(currentServer, msg, callback)
		if err != nil {
			currentServer++
			continue
		}
		// Successfully joined
		break
	}
	return node, nil
}

func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*meta2.NodeInfo, error) {
	cmd := &proto2.CreateMetaNodeCommand{
		HTTPAddr: proto.String(httpAddr),
		TCPAddr:  proto.String(tcpAddr),
		Rand:     proto.Uint64(uint64(rand.Int63())),
	}

	if err := c.retryUntilExec(proto2.Command_CreateMetaNodeCommand, proto2.E_CreateMetaNodeCommand_Command, cmd); err != nil {
		return nil, err
	}

	n := c.MetaNodeByAddr(httpAddr)
	if n == nil {
		return nil, errors.New("new meta node not found")
	}

	c.nodeID = n.ID

	return n, nil
}

func (c *Client) DeleteMetaNode(id uint64) error {
	cmd := &proto2.DeleteMetaNodeCommand{
		ID: proto.Uint64(id),
	}

	return c.retryUntilExec(proto2.Command_DeleteMetaNodeCommand, proto2.E_DeleteMetaNodeCommand_Command, cmd)
}

// CreateSubscription creates a subscription against the given database and retention policy.
func (c *Client) CreateSubscription(database, rp, name, mode string, destinations []string) error {
	return c.retryUntilExec(proto2.Command_CreateSubscriptionCommand, proto2.E_CreateSubscriptionCommand_Command,
		&proto2.CreateSubscriptionCommand{
			Database:        proto.String(database),
			RetentionPolicy: proto.String(rp),
			Name:            proto.String(name),
			Mode:            proto.String(mode),
			Destinations:    destinations,
		},
	)
}

// DropSubscription removes the named subscription from the given database and retention policy.
func (c *Client) DropSubscription(database, rp, name string) error {
	return c.retryUntilExec(proto2.Command_DropSubscriptionCommand, proto2.E_DropSubscriptionCommand_Command,
		&proto2.DropSubscriptionCommand{
			Database:        proto.String(database),
			RetentionPolicy: proto.String(rp),
			Name:            proto.String(name),
		},
	)
}

// SetData overwrites the underlying data in the meta store.
func (c *Client) SetData(data *meta2.Data) error {
	return c.retryUntilExec(proto2.Command_SetDataCommand, proto2.E_SetDataCommand_Command,
		&proto2.SetDataCommand{
			Data: data.Marshal(),
		},
	)
}

// Data returns a Clone of the underlying data in the meta store.
func (c *Client) Data() meta2.Data {
	c.mu.RLock()
	defer c.mu.RUnlock()
	d := c.cacheData.Clone()
	return *d
}

// WaitForDataChanged returns a channel that will get a stuct{} when
// the metastore data has changed.
func (c *Client) WaitForDataChanged() chan struct{} {
	ch := make(chan struct{})

	c.changed <- ch
	return ch
}

// MarshalBinary returns a binary representation of the underlying data.
func (c *Client) MarshalBinary() ([]byte, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.MarshalBinary()
}

func (c *Client) index() uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Index
}

// retryUntilExec will attempt the command on each of the metaservers until it either succeeds or
// hits the max number of tries
func (c *Client) retryUntilExec(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
	index, err := c.retryExec(typ, desc, value)
	if err != nil {
		return err
	}
	c.waitForIndex(index)
	return nil
}

func (c *Client) retryExec(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) (index uint64, err error) {
	// TODO do not use index to check cache data is newest
	tries := 0
	currentServer := connectedServer
	timeout := time.After(RetryExecTimeout)

	for {
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-timeout:
			c.mu.RUnlock()
			return c.index(), meta2.ErrCommandTimeout
		case <-c.closing:
			c.mu.RUnlock()
			return c.index(), nil
		default:
			// we're still open, continue on
		}
		c.mu.RUnlock()

		// build the url to hit the redirect server or the next metaserver
		var server string

		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server = c.metaServers[currentServer]
		c.mu.RUnlock()

		index, err = c.exec(currentServer, typ, desc, value, message.ExecuteRequestMessage)

		if err == nil {
			return index, nil
		}
		tries++
		currentServer++

		if strings.Contains(err.Error(), "node is not the leader") {
			continue
		}

		if _, ok := err.(errCommand); ok {
			return c.index(), err
		}
		c.logger.Info("retryUntilExec retry", zap.String("server", server), zap.Any("type", typ),
			zap.Int("tries", tries), zap.Error(err))
		time.Sleep(errSleep)
	}
}

func getMetaMsg(msgTy uint8, body []byte) *message.MetaMessage {
	switch msgTy {
	case message.ExecuteRequestMessage:
		return message.NewMetaMessage(msgTy, &message.ExecuteRequest{Body: body})
	case message.ReportRequestMessage:
		return message.NewMetaMessage(msgTy, &message.ReportRequest{Body: body})
	default:
		return nil
	}
}

func (c *Client) exec(currentServer int, typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}, msgTy uint8) (index uint64, err error) {
	// Create command.
	cmd := &proto2.Command{Type: &typ}
	if err := proto.SetExtension(cmd, desc, value); err != nil {
		panic(err)
	}

	b, err := proto.Marshal(cmd)
	if err != nil {
		return 0, err
	}

	callback := &ExecuteAndReportCallback{Typ: msgTy}
	msg := getMetaMsg(msgTy, b)
	err = c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return 0, err
	} else if callback.ErrCommand != nil {
		return 0, *callback.ErrCommand
	}
	return callback.Index, nil
}

func (c *Client) waitForIndex(idx uint64) {
	for {
		ch := c.WaitForDataChanged()
		c.mu.RLock()
		if c.cacheData.Index >= idx {
			c.mu.RUnlock()
			return
		}
		c.mu.RUnlock()
		<-ch
	}
}

// Role: sql/store/meta
func (c *Client) pollForUpdates(role Role) {
	for {
		data := c.retryUntilSnapshot(role, c.index())
		if data == nil {
			// this will only be nil if the client has been closed,
			// so we can exit out
			c.logger.Error("client has been closed")
			return
		}
		// update the data and notify of the change
		c.mu.Lock()
		idx := c.cacheData.Index
		if idx < data.Index {
			c.cacheData = data
			c.updateAuthCache()
			for len(c.changed) > 0 {
				notifyC := <-c.changed
				close(notifyC)
			}
		}
		c.mu.Unlock()
	}
}

func (c *Client) getNode(currentServer int, writeHost, queryHost string) (*meta2.NodeStartInfo, error) {
	callback := &CreateNodeCallback{
		NodeStartInfo: &meta2.NodeStartInfo{},
	}
	msg := message.NewMetaMessage(message.CreateNodeRequestMessage, &message.CreateNodeRequest{WriteHost: writeHost, QueryHost: queryHost})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.NodeStartInfo, nil
}

func (c *Client) getShardInfo(currentServer int, cmd *proto2.Command) ([]byte, error) {
	b, err := proto.Marshal(cmd)
	if err != nil {
		return nil, err
	}
	callback := &GetShardInfoCallback{}
	msg := message.NewMetaMessage(message.GetShardInfoRequestMessage, &message.GetShardInfoRequest{Body: b})
	err = c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) getSnapshot(role Role, currentServer int, index uint64) (*meta2.Data, error) {
	c.logger.Debug("getting snapshot from start")

	callback := &SnapshotCallback{}
	msg := message.NewMetaMessage(message.SnapshotRequestMessage, &message.SnapshotRequest{Role: int(role), Index: index})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	c.logger.Debug("getting snapshot from end")

	if len(callback.Data) == 0 {
		return nil, nil
	}
	stat := statistics.NewMetaStatistics()
	stat.AddSnapshotTotal(1)
	stat.AddSnapshotDataSize(int64(len(callback.Data)))

	start := time.Now()
	data := &meta2.Data{}
	if err = data.UnmarshalBinary(callback.Data); err != nil {
		return nil, err
	}
	stat.AddSnapshotUnmarshalDuration(time.Since(start).Milliseconds())

	return data, nil
}

func (c *Client) getDownSampleInfo(currentServer int) ([]byte, error) {
	callback := &GetDownSampleInfoCallback{}
	msg := message.NewMetaMessage(message.GetDownSampleInfoRequestMessage, &message.GetDownSampleInfoRequest{})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) getRpMstInfos(currentServer int, dbName, rpName string, dataTypes []int64) ([]byte, error) {
	callback := &GetRpMstInfoCallback{}
	msg := message.NewMetaMessage(message.GetRpMstInfosRequestMessage, &message.GetRpMstInfosRequest{
		DbName:    dbName,
		RpName:    rpName,
		DataTypes: dataTypes,
	})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}
	return callback.Data, nil
}

func (c *Client) SendRPCMsg(currentServer int, msg *message.MetaMessage, callback transport.Callback) error {
	trans, err := transport.NewMetaTransport(uint64(currentServer), spdy.MetaRequest, callback)
	if err != nil {
		return err
	}
	trans.SetTimeout(RPCReqTimeout)
	if err = trans.Send(msg); err != nil {
		return err
	}
	if err = trans.Wait(); err != nil {
		return err
	}
	refreshConnectedServer(currentServer)
	return nil
}

// Peers returns the TCPHost addresses of all the metaservers
func (c *Client) Peers() []string {

	var peers Peers
	// query each server and keep track of who their peers are
	for currentServer := range c.metaServers {
		callback := &PeersCallback{}
		msg := message.NewMetaMessage(message.PeersRequestMessage, &message.PeersRequest{})
		err := c.SendRPCMsg(currentServer, msg, callback)
		if err != nil {
			continue
		}
		//// This meta-server might not be ready to answer, continue on
		p := callback.Peers
		peers = peers.Append(p...)
	}

	// Return the unique set of peer addresses
	return []string(peers.Unique())
}

func (c *Client) updateAuthCache() {
	// copy cached user info for still-present users
	newCache := make(map[string]authUser, len(c.authCache))

	for _, userInfo := range c.cacheData.Users {
		if cached, ok := c.authCache[userInfo.Name]; ok {
			if cached.bhash == userInfo.Hash {
				newCache[userInfo.Name] = cached
			}
		}
	}

	c.authCache = newCache
}

func (c *Client) url(server string) string {
	url := fmt.Sprintf("://%s", server)

	if c.tls {
		url = "https" + url
	} else {
		url = "http" + url
	}

	return url
}

var connectedServer int

func (c *Client) retryUntilSnapshot(role Role, idx uint64) *meta2.Data {
	currentServer := connectedServer
	for {
		// get the index to look from and the server to poll
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:
			// we're still open, continue on
		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server := c.metaServers[currentServer]
		c.mu.RUnlock()

		data, err := c.getSnapshot(role, currentServer, idx)

		if err == nil && data != nil {
			return data
		} else if err == nil && data == nil {
			continue
		}

		c.logger.Debug("failure getting snapshot from", zap.String("server", server), zap.Error(err))
		time.Sleep(errSleep)

		currentServer++
	}
}

func (c *Client) RetryDownSampleInfo() ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var downSampleInfo []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:
		}
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		downSampleInfo, err = c.getDownSampleInfo(currentServer)
		if err == nil {
			break
		}
		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)
		currentServer++
	}
	return downSampleInfo, err
}

func (c *Client) RetryMstInfosInRp(dbName, rpName string, dataTypes []int64) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var mstInfos []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:
		}
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		mstInfos, err = c.getRpMstInfos(currentServer, dbName, rpName, dataTypes)
		if err == nil {
			break
		}
		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)
		currentServer++
	}
	return mstInfos, err
}

func (c *Client) RetryGetShardAuxInfo(cmd *proto2.Command) ([]byte, error) {
	startTime := time.Now()
	currentServer := connectedServer
	var err error
	var shardAuxInfo []byte
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil, meta2.ErrClientClosed
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		shardAuxInfo, err = c.getShardInfo(currentServer, cmd)
		if err == nil {
			break
		}
		if errno.Equal(err, errno.ShardMetaNotFound) {
			c.logger.Error("get shard info failed", zap.String("cmd", cmd.String()), zap.Error(err))
			break
		}

		if time.Since(startTime).Nanoseconds() > int64(len(c.metaServers))*HttpReqTimeout.Nanoseconds() {
			c.logger.Error("get shard info timeout", zap.String("cmd", cmd.String()), zap.Error(err))
			break
		}
		c.logger.Error("retry get shard info", zap.String("cmd", cmd.String()), zap.Error(err))
		time.Sleep(errSleep)

		currentServer++
	}
	return shardAuxInfo, err
}

func (c *Client) GetShardRangeInfo(db string, rp string, shardID uint64) (*meta2.ShardTimeRangeInfo, error) {
	val := &proto2.TimeRangeCommand{
		Database: proto.String(db),
		Policy:   proto.String(rp),
		ShardID:  proto.Uint64(shardID),
	}

	t := proto2.Command_TimeRangeCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_TimeRangeCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryGetShardAuxInfo(cmd)
	if err != nil {
		return nil, err
	}

	rangeInfo := &meta2.ShardTimeRangeInfo{}
	if err := rangeInfo.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return rangeInfo, err
}

func (c *Client) GetShardDurationInfo(index uint64) (*meta2.ShardDurationResponse, error) {
	val := &proto2.ShardDurationCommand{Index: proto.Uint64(index), Pts: nil, NodeId: proto.Uint64(c.nodeID)}
	t := proto2.Command_ShardDurationCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_ShardDurationCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryGetShardAuxInfo(cmd)
	if err != nil {
		return nil, err
	}

	durationInfo := &meta2.ShardDurationResponse{}
	if err := durationInfo.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return durationInfo, nil
}

func (c *Client) MetaServers() []string {
	return c.metaServers
}

func (c *Client) InitMetaClient(joinPeers []string, tlsEn bool, storageNodeInfo *StorageNodeInfo) (uint64, uint64, error) {
	// It's the first time starting up and we need to either join
	// the cluster or initialize this node as the first member
	if len(joinPeers) == 0 {
		// start up a new single node cluster
		return 0, 0, fmt.Errorf("invlude meta nodes host")
	}

	// join this node to dthe cluster
	c.SetMetaServers(joinPeers)
	c.SetTLS(tlsEn)

	var nid uint64
	var err error
	var clock uint64
	if storageNodeInfo != nil {
		nid, clock, err = c.CreateDataNode(storageNodeInfo.InsertAddr, storageNodeInfo.SelectAddr)
	}

	return nid, clock, err
}

func (c *Client) retryReport(typ proto2.Command_Type, desc *proto.ExtensionDesc, value interface{}) error {
	tries := 0
	currentServer := connectedServer
	timeout := time.After(RetryReportTimeout)

	for {
		c.mu.RLock()
		// exit if we're closed
		select {
		case <-timeout:
			c.mu.RUnlock()
			return meta2.ErrCommandTimeout
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:
			// we're still open, continue on
		}
		c.mu.RUnlock()

		// build the url to hit the redirect server or the next metaserver
		var server string
		c.mu.RLock()
		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		server = c.metaServers[currentServer]
		c.mu.RUnlock()

		_, err := c.exec(currentServer, typ, desc, value, message.ReportRequestMessage)

		if err == nil {
			return nil
		}
		tries++
		currentServer++

		if strings.Contains(err.Error(), "node is not the leader") {
			continue
		}

		if _, ok := err.(errCommand); ok {
			return err
		}
		c.logger.Info("retryUntilExec retry", zap.String("server", server), zap.Any("type", typ),
			zap.Int("tries", tries), zap.Error(err))
		time.Sleep(errSleep)
	}
}

func (c *Client) ReportShardLoads(dbPTStats []*proto2.DBPtStatus) error {
	cmd := &proto2.ReportShardsLoadCommand{
		DBPTStat: dbPTStats,
	}
	return c.retryReport(proto2.Command_ReportShardsCommand, proto2.E_ReportShardsLoadCommand_Command, cmd)
}

func (c *Client) Measurements(database string, ms influxql.Measurements) ([]string, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	mstMaps, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	var measurements []string
	for _, mi := range mstMaps {
		measurements = append(measurements, mi.OriginName())
	}

	if len(measurements) == 0 {
		return measurements, nil
	}
	sort.Strings(measurements)

	l := len(measurements)
	for i := 1; i < l; i++ {
		if measurements[i] == measurements[i-1] {
			l--
			measurements = append(measurements[:i], measurements[i+1:]...)
		}
	}
	return measurements, nil
}

func (c *Client) MatchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	return c.matchMeasurements(database, ms)
}

func (c *Client) matchMeasurements(database string, ms influxql.Measurements) (map[string]*meta2.MeasurementInfo, error) {
	dbi, err := c.cacheData.GetDatabase(database)
	if err != nil {
		return nil, err
	}

	ret := make(map[string]*meta2.MeasurementInfo)
	dbi.WalkRetentionPolicy(func(rp *meta2.RetentionPolicyInfo) {
		rp.MatchMeasurements(ms, ret)
	})

	return ret, nil
}

func (c *Client) QueryTagKeys(database string, ms influxql.Measurements, cond influxql.Expr) (map[string]map[string]struct{}, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()

	mis, err := c.matchMeasurements(database, ms)
	if err != nil {
		return nil, err
	}
	if len(mis) == 0 {
		return nil, nil
	}

	// map[measurement name] map[tag key] struct{}
	ret := make(map[string]map[string]struct{}, len(mis))
	for _, m := range mis {
		if _, ok := ret[m.Name]; !ok {
			ret[m.Name] = make(map[string]struct{})
		}
		m.MatchTagKeys(cond, ret)
	}

	return ret, nil
}

func (c *Client) NewDownSamplePolicy(database, name string, info *meta2.DownSamplePolicyInfo) error {
	cmd := &proto2.CreateDownSamplePolicyCommand{
		Database:             proto.String(database),
		Name:                 proto.String(name),
		DownSamplePolicyInfo: info.Marshal(),
	}
	err := c.retryUntilExec(proto2.Command_CreateDownSamplePolicyCommand, proto2.E_CreateDownSamplePolicyCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) DropDownSamplePolicy(database, name string, dropAll bool) error {
	cmd := &proto2.DropDownSamplePolicyCommand{
		Database: proto.String(database),
		RpName:   proto.String(name),
		DropAll:  proto.Bool(dropAll),
	}
	err := c.retryUntilExec(proto2.Command_DropDownSamplePolicyCommand, proto2.E_DropDownSamplePolicyCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) ShowDownSamplePolicies(database string) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowDownSamplePolicies(database)
}

func (c *Client) GetDownSamplePolicies() (*meta2.DownSamplePoliciesInfoWithDbRp, error) {
	val := &proto2.GetDownSamplePolicyCommand{}
	t := proto2.Command_GetDownSamplePolicyCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_GetDownSamplePolicyCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryDownSampleInfo()
	if err != nil {
		return nil, err
	}
	return c.unmarshalDownSamplePolicies(b)
}

func (c *Client) unmarshalDownSamplePolicies(b []byte) (*meta2.DownSamplePoliciesInfoWithDbRp, error) {
	DownSample := &meta2.DownSamplePoliciesInfoWithDbRp{}
	if err := DownSample.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return DownSample, nil
}

func (c *Client) GetMstInfoWithInRp(dbName, rpName string, dataTypes []int64) (*meta2.RpMeasurementsFieldsInfo, error) {
	val := &proto2.GetMeasurementInfoWithinSameRpCommand{
		DbName:    &dbName,
		RpName:    &rpName,
		DataTypes: dataTypes,
	}
	t := proto2.Command_GetMeasurementInfoWithinSameRpCommand
	cmd := &proto2.Command{Type: &t}
	if err := proto.SetExtension(cmd, proto2.E_GetMeasurementInfoWithinSameRpCommand_Command, val); err != nil {
		panic(err)
	}
	b, err := c.RetryMstInfosInRp(dbName, rpName, dataTypes)
	if err != nil {
		return nil, err
	}
	return c.unmarshalMstInfoWithInRp(b)
}

func (c *Client) unmarshalMstInfoWithInRp(b []byte) (*meta2.RpMeasurementsFieldsInfo, error) {
	DownSample := &meta2.RpMeasurementsFieldsInfo{}
	if err := DownSample.UnmarshalBinary(b); err != nil {
		return nil, err
	}
	return DownSample, nil
}

func (c *Client) UpdateUserInfo() {
	go c.retryGetUserInfo()
}

func (c *Client) retryGetUserInfo() {
	currentServer := connectedServer
	ticker := time.NewTicker(RetryGetUserInfoTimeout)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			c.mu.RLock()
			server := c.metaServers[currentServer]
			c.mu.RUnlock()
			data, err := c.getUserInfo(currentServer, c.index())
			if err != nil {
				c.logger.Error("failure getting userinfo from", zap.String("server", server), zap.Error(err))
			}
			if data != nil {
				idx := c.index()
				if idx < data.Index {
					c.mu.Lock()
					c.cacheData = data
					c.mu.Unlock()
				}
			}
		case <-c.closing:
			return
		}
	}
}

func (c *Client) getUserInfo(currentServer int, index uint64) (*meta2.Data, error) {
	callback := &GetUserInfoCallback{}
	msg := message.NewMetaMessage(message.GetUserInfoRequestMessage, &message.GetUserInfoRequest{Index: index})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		return nil, err
	}

	if len(callback.Data) == 0 {
		return nil, nil
	}

	data := &meta2.Data{}
	if err = data.UnmarshalBinary(callback.Data); err != nil {
		return nil, err
	}

	return data, nil
}

func (c *Client) UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error {
	val := &proto2.UpdateShardDownSampleInfoCommand{
		Ident: Ident.Marshal(),
	}
	if _, err := c.retryExec(proto2.Command_UpdateShardDownSampleInfoCommand, proto2.E_UpdateShardDownSampleInfoCommand_Command, val); err != nil {
		return err
	}
	return nil
}

func (c *Client) GetStreamInfos() map[string]*meta2.StreamInfo {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.Streams
}

// GetDstStreamInfos get the stream info whose db and rip of the data are the same as the db and rp of the source table of the stream
// Note: make sure dstSis is initialized
func (c *Client) GetDstStreamInfos(db, rp string, dstSis *[]*meta2.StreamInfo) bool {
	c.mu.RLock()
	defer c.mu.RUnlock()
	if len(c.cacheData.Streams) == 0 {
		return false
	}
	i := 0
	*dstSis = (*dstSis)[:cap(*dstSis)]
	for _, si := range c.cacheData.Streams {
		if si.SrcMst.Database == db && si.SrcMst.RetentionPolicy == rp {
			if len(*dstSis) < i+1 {
				*dstSis = append(*dstSis, si)
			} else {
				(*dstSis)[i] = si
			}
			i++
		}
	}
	*dstSis = (*dstSis)[:i]
	return len(*dstSis) > 0
}

func (c *Client) UpdateStreamMstSchema(database string, retentionPolicy string, mst string, stmt *influxql.SelectStatement) error {
	fieldToCreate := make([]*proto2.FieldSchema, 0, len(stmt.Fields)+len(stmt.Dimensions))
	fields := stmt.Fields
	for i := range fields {
		f, ok := fields[i].Expr.(*influxql.Call)
		if !ok {
			return errors.New("unexpected call function")
		}
		if fields[i].Alias == "" {
			fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
				FieldName: proto.String(f.Name + "_" + f.Args[0].(*influxql.VarRef).Val),
			})
		} else {
			fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
				FieldName: proto.String(fields[i].Alias),
			})
		}
		valuer := influxql.TypeValuerEval{
			TypeMapper: DefaultTypeMapper,
		}
		t, e := valuer.EvalType(f, false)
		if e != nil {
			return e
		}
		switch t {
		case influxql.Float:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Float)
		case influxql.Integer:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Int)
		case influxql.Boolean:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_Int)
		case influxql.String:
			fieldToCreate[i].FieldType = proto.Int32(influx.Field_Type_String)
		default:
			return errors.New("unexpected call type")
		}
	}
	dims := stmt.Dimensions
	for i := range dims {
		d, ok := dims[i].Expr.(*influxql.VarRef)
		if !ok {
			continue
		}
		fieldToCreate = append(fieldToCreate, &proto2.FieldSchema{
			FieldName: proto.String(d.Val),
			FieldType: proto.Int32(influx.Field_Type_Tag),
		})
	}
	return c.UpdateSchema(database, retentionPolicy, mst, fieldToCreate)
}

func (c *Client) CreateStreamPolicy(info *meta2.StreamInfo) error {
	cmd := &proto2.CreateStreamCommand{
		StreamInfo: info.Marshal(),
	}
	err := c.retryUntilExec(proto2.Command_CreateStreamCommand, proto2.E_CreateStreamCommand_Command, cmd)
	if err != nil {
		return err
	}
	return nil
}

func (c *Client) GetStreamInfosStore() map[string]*meta2.StreamInfo {
	return c.RetryGetStreamInfosStore()
}

func (c *Client) RetryGetStreamInfosStore() map[string]*meta2.StreamInfo {
	startTime := time.Now()
	currentServer := connectedServer
	var infos map[string]*meta2.StreamInfo
	for {
		c.mu.RLock()
		select {
		case <-c.closing:
			c.mu.RUnlock()
			return nil
		default:

		}

		if currentServer >= len(c.metaServers) {
			currentServer = 0
		}
		c.mu.RUnlock()
		infos = c.GetStreamInfosForStore(currentServer)
		if infos != nil {
			break
		}

		if time.Since(startTime).Seconds() > float64(len(c.metaServers))*HttpReqTimeout.Seconds() {
			break
		}
		time.Sleep(errSleep)

		currentServer++
	}
	return infos
}

func (c *Client) GetStreamInfosForStore(currentServer int) map[string]*meta2.StreamInfo {
	callback := &GetStreamInfoCallback{}
	msg := message.NewMetaMessage(message.GetStreamInfoRequestMessage, &message.GetStreamInfoRequest{Body: nil})
	err := c.SendRPCMsg(currentServer, msg, callback)
	if err != nil {
		c.logger.Error("GetStreamInfosForStore SendRPCMsg fail", zap.Error(err))
		return nil
	}
	pb := &proto2.StreamInfos{}
	err = proto.Unmarshal(callback.Data, pb)
	if err != nil {
		c.logger.Error("GetStreamInfosForStore Unmarshal fail", zap.Error(err))
		return nil
	}
	metaInfos := make(map[string]*meta2.StreamInfo)
	for _, v := range pb.GetInfos() {
		s := &meta2.StreamInfo{}
		s.Unmarshal(v)
		metaInfos[s.Name] = s
	}
	return metaInfos
}

func (c *Client) ShowStreams(database string, showAll bool) (models.Rows, error) {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.cacheData.ShowStreams(database, showAll)
}

func (c *Client) DropStream(name string) error {
	cmd := &proto2.DropStreamCommand{
		Name: proto.String(name),
	}
	if err := c.retryUntilExec(proto2.Command_DropStreamCommand, proto2.E_DropStreamCommand_Command, cmd); err != nil {
		return nil
	}
	return nil
}

var VerifyNodeEn = true

func (c *Client) verifyDataNodeStatus() {
	if !config.GetHaEnable() {
		return
	}

	tries := 0
	for {
		select {
		case <-c.closing:
			return
		default:
			time.Sleep(10 * time.Second)
			if !VerifyNodeEn {
				continue
			}
			if err := c.retryVerifyDataNodeStatus(); err != nil {
				tries++
				c.logger.Error("Verify retry", zap.Int("tries", tries), zap.Error(err))
				if tries >= 3 {
					c.Suicide(err)
				}
				continue
			}
			tries = 0
		}
	}
}

func (c *Client) retryVerifyDataNodeStatus() error {
	cmd := &proto2.VerifyDataNodeCommand{
		NodeID: proto.Uint64(c.nodeID),
	}
	return c.retryUntilExec(proto2.Command_VerifyDataNodeCommand, proto2.E_VerifyDataNodeCommand_Command, cmd)
}

func (c *Client) Suicide(err error) {
	c.logger.Error("Suicide for fault data node", zap.Error(err))
	time.Sleep(errSleep)
	if e := syscall.Kill(syscall.Getpid(), syscall.SIGKILL); e != nil {
		panic(fmt.Sprintf("FATAL: cannot send SIGKILL to itself: %v", e))
	}
}

func refreshConnectedServer(currentServer int) {
	if currentServer != connectedServer {
		connectedServer = currentServer
	}
}

type Peers []string

func (peers Peers) Append(p ...string) Peers {
	peers = append(peers, p...)

	return peers.Unique()
}

func (peers Peers) Unique() Peers {
	distinct := map[string]struct{}{}
	for _, p := range peers {
		distinct[p] = struct{}{}
	}

	var u Peers
	for k := range distinct {
		u = append(u, k)
	}
	return u
}

func (peers Peers) Contains(peer string) bool {
	for _, p := range peers {
		if p == peer {
			return true
		}
	}
	return false
}

type ErrRedirect struct {
	Host string
}

func (e ErrRedirect) Error() string {
	return fmt.Sprintf("redirect to %s", e.Host)
}

type errCommand struct {
	msg string
}

func (e errCommand) Error() string {
	return e.msg
}

type uint64Slice []uint64

func (a uint64Slice) Len() int           { return len(a) }
func (a uint64Slice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

func DefaultHost(hostname, addr string) (string, error) {
	host, port, err := net.SplitHostPort(addr)
	if err != nil {
		return "", err
	}

	if host == "" || host == "0.0.0.0" || host == "::" {
		return net.JoinHostPort(hostname, port), nil
	}
	return addr, nil
}
